Skip to main content

Unit Testing Rules: A Practical Guide

This guide shows how to unit test your custom rules locally using the ewx_public.testing module. With a single pip install ewx-public, you get everything you need — no mocks required.

Availability

The ewx_public.testing module is available from platform release 26.05 onwards (package version ewx-public==26.5.0). Pin to that version or later to use the testing utilities described in this guide.

Example project

For a complete working project with rules and tests, see the examples/my-ewx-rules/ directory in the ewx-public package.

1. Setup

Install ewx-public following the instructions in Local Rule Development, then add pytest to your dev dependencies:

[tool.poetry.group.dev.dependencies]
pytest = ">=7.0"
pytest-cov = ">=4.0"

2. Your first test

Suppose you have a production rule that converts kWh to MWh:

# rules/unit_conversion_rule.py — uploaded to the platform
from ewx_public.flow_rule import FlowRule
from ewx_public import RuleResult

class UnitConversionRule(FlowRule):
def prepare_context(self, **kwargs):
return {}

def apply(self, **kwargs):
source = self.flow_properties.get("source_channel", "E_CONS_kWh")
destination = self.flow_properties.get("destination_channel", "E_CONS_MWh")

converted = self.dataframe[[source]] / 1000.0
converted.columns = [destination]
self.store_timeseries(converted, channel_id=destination)

return RuleResult()

Test it with make_testable():

# tests/test_unit_conversion.py
from ewx_public.testing import (
assert_timeseries_stored,
make_datasource,
make_testable,
make_timeseries_df,
)
from rules.unit_conversion_rule import UnitConversionRule


def test_converts_kwh_to_mwh():
rule = make_testable(
UnitConversionRule,
datasource=make_datasource(channels=["E_CONS_kWh", "E_CONS_MWh"]),
dataframe=make_timeseries_df(columns=["E_CONS_kWh"], fill_value=1000.0),
flow_properties={
"source_channel": "E_CONS_kWh",
"destination_channel": "E_CONS_MWh",
},
)

rule.run()

assert_timeseries_stored(rule, channel_id="E_CONS_MWh", count=1)
stored = rule.backend.stored_timeseries[0]
assert (stored["timeseries"]["E_CONS_MWh"] == 1.0).all()

Key points:

  • make_testable(RuleClass, ...) wraps any FlowRule subclass for local testing
  • rule.run() executes the full lifecycle: prepare_context()apply()
  • assert_timeseries_stored() checks what the rule stored — no mock assertions needed
  • rule.backend holds all side effects for direct inspection

3. Arrange — Act — Assert

Every test follows the same structure:

  1. Arrange — create the rule with test data using make_testable() and factory functions
  2. Act — call rule.run() (full lifecycle) or rule.apply() (just the apply method)
  3. Assert — check what happened using assertion helpers or rule.backend

run() vs apply()

  • rule.run() — full lifecycle: calls prepare_context(), loads prepared datasources, validates flow_properties, then calls apply(), and validates the RuleResult. Use this for most tests.
  • rule.apply() — calls only apply(). Use this when you need to set up rule.context manually or test a specific code path:
def test_with_manual_context():
rule = make_testable(MyRule, datasource=ds, dataframe=df)
rule.context = {"custom_key": "value"}
rule.prepared_datasources = {"other-ds": other_ds}
result = rule.apply()

4. Factories — creating test data

Factory functions create test objects with sensible defaults:

from ewx_public.testing import (
make_datasource,
make_channel,
make_tag,
make_tag_property,
make_timeseries_df,
make_namespace,
make_flow_configuration,
)

# Datasource with channels as strings (auto-converted to Channel objects)
ds = make_datasource(
id="meter-001",
timezone="Europe/Amsterdam",
channels=["E_CONS", "E_PROD"],
)

# Timeseries DataFrame with DatetimeIndex (the format FlowRule.dataframe uses)
df = make_timeseries_df(
columns=["E_CONS", "E_PROD"],
start="2024-06-01",
periods=48,
freq="30min",
timezone="Europe/Amsterdam",
fill_value=1.5,
)

# Namespace (for rules that access self.namespace)
ns = make_namespace(id="my-company.com", name="My Company")

# Tag with properties
tag = make_tag(
tag="METER_TYPE",
properties=[
make_tag_property(key="type", value="smart"),
make_tag_property(key="brand", value="Landis+Gyr"),
],
)

Deterministic IDs

Factories auto-increment IDs (ds-1, ds-2, ...). For deterministic tests, reset them in a fixture:

# tests/conftest.py
import pytest
from ewx_public.testing import reset_factory_counters

@pytest.fixture(autouse=True)
def _reset_ids():
reset_factory_counters()

5. Assertion helpers

Assertion helpers check rule side effects with descriptive error messages:

from ewx_public.testing import (
assert_timeseries_stored,
assert_annotations_stored,
assert_tags_added,
assert_trigger_sent,
assert_email_sent,
assert_task_created,
assert_counter_value,
assert_logged,
)

# Timeseries
assert_timeseries_stored(rule, channel_id="E_CONS", count=1)
assert_timeseries_stored(rule, count=0) # assert nothing was stored

# Annotations
assert_annotations_stored(rule, channel_id="E_CONS", count=1)

# Tags
assert_tags_added(rule, datasource_id="meter-001", tag_name="VALIDATED")

# Triggers
assert_trigger_sent(rule, trigger_type="flow", count=1)

# Emails
assert_email_sent(rule, to_email="ops@example.com", subject_contains="Alert")

# Tasks
assert_task_created(rule, title_contains="Review gaps")

# Counters
assert_counter_value(rule, "processed_count", 42)

# Log messages
assert_logged(rule, level="warning", message_contains="Skipping")
assert_logged(rule, level="info", count=3)

The count parameter: None (default) = at least one, 0 = none, integer = exact count.

Direct backend inspection

For content-level assertions beyond what the helpers offer, inspect rule.backend directly:

stored = rule.backend.stored_timeseries[0]
assert stored["channel_id"] == "E_CONS"
assert stored["datasource_id"] == "meter-001"
assert (stored["timeseries"]["E_CONS"] > 0).all()

trigger = rule.backend.triggers[0]
assert trigger["type"] == "flow"
assert trigger["flow_config_id"] == 42

6. Seeding data for lookups

Rules often load data during execution (datasources, timeseries, channel classifiers). Seed the backend before running the rule:

rule = make_testable(MyRule, datasource=make_datasource(id="meter-001"))

# Seed a datasource for load_datasource() or prepare_datasource_ids
other_ds = make_datasource(id="weather-station")
rule.backend.add_datasource(other_ds)

# Seed timeseries for timeseries_service.get_latest()
historical_df = make_timeseries_df(
columns=["E_CONS"],
start="2023-12-01",
periods=720,
)
rule.backend.add_timeseries("meter-001", "E_CONS", historical_df)

# Seed a channel classifier for load_channel_classifier()
from ewx_public.testing import make_channel_classifier
rule.backend.add_channel_classifier(make_channel_classifier(name="E_CONS"))

rule.run()

7. Testing log messages

The default logger in make_testable() is a CapturingLogger that records all messages:

rule = make_testable(MyRule, ...)
rule.run()

# Using assertion helpers
assert_logged(rule, level="warning", message_contains="Skipping")

# Direct inspection
assert "Processing complete" in rule.rule_logger.infos
assert len(rule.rule_logger.warnings) == 1
assert rule.rule_logger.errors == []

8. Testing TransformRules

TransformRules are simpler — instantiate them directly (no make_testable() needed):

from rules.normalize_transform import NormalizeTransform
import pandas as pd

def test_rounds_to_three_decimals():
index = pd.date_range("2024-01-01", periods=3, freq="h", tz="UTC")
df = pd.DataFrame({"E_CONS": [1.23456, 2.34567, 3.45678]}, index=index)

rule = NormalizeTransform(dataframe=df, context={"decimals": 3})
result = rule.apply()

assert result.result["E_CONS"].tolist() == [1.235, 2.346, 3.457]

9. Testing error paths

Exceptions

Test that your rule raises FlowCancelException or FlowStopException correctly:

import pytest
from ewx_public import FlowCancelException

def test_cancels_on_invalid_data():
rule = make_testable(MyRule, dataframe=pd.DataFrame()) # empty
with pytest.raises(FlowCancelException):
rule.run()

Simulating service failures

Use raising() to make a method raise when called:

from ewx_public.testing import raising

rule = make_testable(MyRule, ...)
rule.publish_to_custom_pubsub_topic = raising(ValueError, "Topic not found")

with pytest.raises(ValueError, match="Topic not found"):
rule.run()

10. Running tests

# Run all tests
pytest

# Run with verbose output
pytest -v

# Run a single test file
pytest tests/test_my_rule.py

# Run with coverage
pytest --cov=rules --cov-report=term-missing

11. Migrating from MagicMock

If you have existing tests using MagicMock, here's how to migrate:

Old pattern (MagicMock)New pattern (ewx_public.testing)
rule = MyRule(datasource=MagicMock(), ...)rule = make_testable(MyRule, datasource=make_datasource(), ...)
rule.rule_logger = MagicMock()Remove — CapturingLogger is the default
rule.store_timeseries = MagicMock()Remove — use rule.backend.stored_timeseries
rule.store_timeseries.assert_called_once()assert_timeseries_stored(rule, count=1)
rule.load_datasource = MagicMock(return_value=ds)rule.backend.add_datasource(ds)
rule.services = MagicMock()Remove — NullObject is the default
namespace=MagicMock(id="ns")namespace=make_namespace(id="ns")
MagicMock(side_effect=ValueError(...))raising(ValueError, "message")
tip

The goal is zero unittest.mock imports. The testing module provides replacements for every common mock pattern.

Summary

WhatImport
Wrap a rule for testingfrom ewx_public.testing import make_testable
Create test datafrom ewx_public.testing import make_datasource, make_timeseries_df
Assert side effectsfrom ewx_public.testing import assert_timeseries_stored, assert_tags_added
Assert log messagesfrom ewx_public.testing import assert_logged
Simulate failuresfrom ewx_public.testing import raising
Reset factory IDsfrom ewx_public.testing import reset_factory_counters

For the complete API reference, see the ewx-public Package page.